package com.atguigu.gmall.realtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.app.func.BeanToJsonStrFunction;
import com.atguigu.gmall.realtime.beans.TrafficHomeDetailPageViewBean;
import com.atguigu.gmall.realtime.utils.DateFormatUtil;
import com.atguigu.gmall.realtime.utils.DorisUtil;
import com.atguigu.gmall.realtime.utils.KafkaUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * @author Felix
 * @date 2023/8/8
 * 首页、详情页独立访客统计
 * 需要启动的进程
 *      zk、kafka、flume、doris、DwdTrafficBaseLogSplit、DwsTrafficHomeDetailPageViewWindow
 */
public class DwsTrafficHomeDetailPageViewWindow {
    public static void main(String[] args) throws Exception {
        //TODO 1.基本环境准备
        //1.1 指定流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度
        env.setParallelism(4);
        //TODO 2.检查点相关的设置
        env.enableCheckpointing(5000L);
        //TODO 3.从kafka的主题中读取数据
        //3.1 声明消费的主题以及消费者组
        String topic = "dwd_traffic_page_log";
        String groupId = "dws_traffic_home_detail_group";
        //3.2 创建消费者对象
        KafkaSource<String> kafkaSource = KafkaUtil.getKafkaSource(topic, groupId);
        //3.3 消费数据  封装为流
        DataStreamSource<String> kafkaStrDS
            = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka_source");
        //TODO 4.对流中数据进行类型转换    jsonStr->jsonObj
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaStrDS.map(JSON::parseObject);
        //TODO 5.过滤首页以及详情页
        SingleOutputStreamOperator<JSONObject> filterDS = jsonObjDS.filter(
            new FilterFunction<JSONObject>() {
                @Override
                public boolean filter(JSONObject jsonObj) throws Exception {
                    String pageId = jsonObj.getJSONObject("page").getString("page_id");
                    return "home".equals(pageId) || "good_detail".equals(pageId);
                }
            }
        );
        // filterDS.print(">>>");

        //TODO 6.指定Watermark以及提取事件时间字段
        SingleOutputStreamOperator<JSONObject> withWatermarkDS = filterDS.assignTimestampsAndWatermarks(
            WatermarkStrategy
                .<JSONObject>forMonotonousTimestamps()
                .withTimestampAssigner(
                    new SerializableTimestampAssigner<JSONObject>() {
                        @Override
                        public long extractTimestamp(JSONObject jsonObj, long recordTimestamp) {
                            return jsonObj.getLong("ts");
                        }
                    }
                )
        );
        //TODO 7.按照mid进行分组
        KeyedStream<JSONObject, String> keyedDS 
            = withWatermarkDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"));
        //TODO 8.使用Flink的状态编程 判断是否为首页或者详情页独立访客  jsonObj->封装统计实体类对象
        SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> beanDS = keyedDS.process(
            new KeyedProcessFunction<String, JSONObject, TrafficHomeDetailPageViewBean>() {
                private ValueState<String> homeLastVisitDateState;
                private ValueState<String> detailLastVisitDateState;

                @Override
                public void open(Configuration parameters) throws Exception {
                    ValueStateDescriptor<String> homeValueStateDescriptor
                        = new ValueStateDescriptor<String>("homeLastVisitDateState", String.class);
                    homeValueStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(1)).build());
                    homeLastVisitDateState = getRuntimeContext().getState(homeValueStateDescriptor);

                    ValueStateDescriptor<String> detailValueStateDescriptor
                        = new ValueStateDescriptor<String>("detailLastVisitDateState", String.class);
                    detailValueStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.days(1)).build());
                    detailLastVisitDateState = getRuntimeContext().getState(detailValueStateDescriptor);

                }

                @Override
                public void processElement(JSONObject jsonObj, Context ctx, Collector<TrafficHomeDetailPageViewBean> out) throws Exception {
                    //获取页面id
                    String pageId = jsonObj.getJSONObject("page").getString("page_id");
                    //获取当前访问日期
                    Long ts = jsonObj.getLong("ts");
                    String curVisitDate = DateFormatUtil.toDate(ts);

                    Long homeUvCt = 0L;
                    Long detailUvCt = 0L;

                    if ("home".equals(pageId)) {
                        //从状态中获取首页的上次访问日期
                        String homeLastVisitDate = homeLastVisitDateState.value();
                        if (StringUtils.isEmpty(homeLastVisitDate) || !homeLastVisitDate.equals(curVisitDate)) {
                            homeUvCt = 1L;
                            homeLastVisitDateState.update(curVisitDate);
                        }
                    } else {
                        //从状态中获取详情页的上次访问日期
                        String detailLastVisitDate = detailLastVisitDateState.value();
                        if (StringUtils.isEmpty(detailLastVisitDate) || !detailLastVisitDate.equals(curVisitDate)) {
                            detailUvCt = 1L;
                            detailLastVisitDateState.update(curVisitDate);
                        }
                    }

                    if (homeUvCt != 0L || detailUvCt != 0L) {
                        out.collect(new TrafficHomeDetailPageViewBean(
                            "",
                            "",
                            "",
                            homeUvCt,
                            detailUvCt
                        ));
                    }
                }
            }
        );
        // beanDS.print(">>>");
        //TODO 9.开窗
        AllWindowedStream<TrafficHomeDetailPageViewBean, TimeWindow> windowDS
            = beanDS.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)));
        //TODO 10.聚合
        SingleOutputStreamOperator<TrafficHomeDetailPageViewBean> reduceDS = windowDS.reduce(
            new ReduceFunction<TrafficHomeDetailPageViewBean>() {
                @Override
                public TrafficHomeDetailPageViewBean reduce(TrafficHomeDetailPageViewBean value1, TrafficHomeDetailPageViewBean value2) throws Exception {
                    value1.setHomeUvCt(value1.getHomeUvCt() + value2.getHomeUvCt());
                    value1.setGoodDetailUvCt(value1.getGoodDetailUvCt() + value2.getGoodDetailUvCt());
                    return value1;
                }
            },
            new AllWindowFunction<TrafficHomeDetailPageViewBean, TrafficHomeDetailPageViewBean, TimeWindow>() {
                @Override
                public void apply(TimeWindow window, Iterable<TrafficHomeDetailPageViewBean> values, Collector<TrafficHomeDetailPageViewBean> out) throws Exception {
                    String stt = DateFormatUtil.toYmdHms(window.getStart());
                    String edt = DateFormatUtil.toYmdHms(window.getEnd());
                    String curDate = DateFormatUtil.toPartitionDate(window.getStart());

                    for (TrafficHomeDetailPageViewBean pageViewBean : values) {
                        pageViewBean.setStt(stt);
                        pageViewBean.setEdt(edt);
                        pageViewBean.setCurDate(curDate);
                        out.collect(pageViewBean);
                    }
                }
            }
        );
        //TODO 11.将聚合的结果写到Doris
        reduceDS.print(">>>");
        reduceDS.map(
            new BeanToJsonStrFunction<>()
        ).sinkTo(
            DorisUtil.getDorisSink("dws_traffic_home_detail_page_view_window")
        );
        env.execute();
    }
}
